---
title: "Workflows"
description: ""
icon: "diagram-project"
---

## Overview

Workflows provide a flexible and extensible component for managing and executing structured sequences of tasks. They are particularly useful for:

- 🔄 **Dynamic Execution:** Steps can direct the flow based on state or results
- ✅ **Validation:** Define schemas for data consistency and type safety
- 🧩 **Modularity:** Steps can be standalone or invoke nested workflows
- 👁️ **Observability:** Emit events during execution to track progress or handle errors

<Note>
Supported in Python and TypeScript.
</Note>

---

## Core Concepts

### State

State is the central data structure in a workflow. It's a Pydantic model that:
- Holds the data passed between steps
- Provides type validation and safety
- Persists throughout the workflow execution

### Steps

Steps are the building blocks of a workflow. Each step is a function that:
- Takes the current state as input
- Can modify the state
- Returns the name of the next step to execute or a special reserved value

### Transitions

Transitions determine the flow of execution between steps. Each step returns either:
- The name of the next step to execute
- `Workflow.NEXT` - proceed to the next step in order
- `Workflow.SELF` - repeat the current step
- `Workflow.END` - end the workflow execution

---

## Basic Usage

### Simple Workflow

The example below demonstrates a minimal workflow that processes steps in sequence. This pattern is useful for straightforward, linear processes where each step builds on the previous one.

<CodeGroup>

{/* <!-- embedme python/examples/workflows/simple.py --> */}
```py Python [expandable]
import asyncio
import sys
import traceback

from pydantic import BaseModel

from beeai_framework.errors import FrameworkError
from beeai_framework.workflows import Workflow


async def main() -> None:
    # State
    class State(BaseModel):
        input: str

    workflow = Workflow(State)
    workflow.add_step("first", lambda state: print("Running first step!"))
    workflow.add_step("second", lambda state: print("Running second step!"))
    workflow.add_step("third", lambda state: print("Running third step!"))

    await workflow.run(State(input="Hello"))


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```

{/* <!-- embedme typescript/examples/workflows/simple.ts --> */}
```ts Typescript [expandable]
import { Workflow } from "beeai-framework/workflows/workflow";
import { z } from "zod";

const schema = z.object({
  hops: z.number().default(0),
});

const workflow = new Workflow({ schema })
  .addStep("a", async (state) => {
    state.hops += 1;
  })
  .addStep("b", () => (Math.random() > 0.5 ? Workflow.PREV : Workflow.END));

const response = await workflow.run({ hops: 0 }).observe((emitter) => {
  emitter.on("start", (data) => console.log(`-> start ${data.step}`));
  emitter.on("error", (data) => console.log(`-> error ${data.step}`));
  emitter.on("success", (data) => console.log(`-> finish ${data.step}`));
});

console.log(`Hops: ${response.result.hops}`);
console.log(`-> steps`, response.steps.map((step) => step.name).join(","));

```

</CodeGroup>

---

## Advanced Features

### Multi-Step and Nested Workflows

This advanced example showcases a workflow that implements multiplication through repeated addition—demonstrating control flow, state manipulation, nesting, and conditional logic.

Workflow nesting allows complex behaviors to be encapsulated as reusable components, enabling hierarchical composition of workflows. This promotes modularity, reusability, and better organization of complex agent logic.

<CodeGroup>
{/* <!-- embedme python/examples/workflows/nesting.py --> */}
```py Python [expandable]
import asyncio
import sys
import traceback
from typing import Literal, TypeAlias

from pydantic import BaseModel

from beeai_framework.errors import FrameworkError
from beeai_framework.workflows import Workflow, WorkflowReservedStepName

WorkflowStep: TypeAlias = Literal["pre_process", "add_loop", "post_process"]


async def main() -> None:
    # State
    class State(BaseModel):
        x: int
        y: int
        abs_repetitions: int | None = None
        result: int | None = None

    def pre_process(state: State) -> WorkflowStep:
        print("pre_process")
        state.abs_repetitions = abs(state.y)
        return "add_loop"

    def add_loop(state: State) -> WorkflowStep | WorkflowReservedStepName:
        if state.abs_repetitions and state.abs_repetitions > 0:
            result = (state.result if state.result is not None else 0) + state.x
            abs_repetitions = (state.abs_repetitions if state.abs_repetitions is not None else 0) - 1
            print(f"add_loop: intermediate result {result}")
            state.abs_repetitions = abs_repetitions
            state.result = result
            return Workflow.SELF
        else:
            return "post_process"

    def post_process(state: State) -> WorkflowReservedStepName:
        print("post_process")
        if state.y < 0:
            result = -(state.result if state.result is not None else 0)
            state.result = result
        return Workflow.END

    multiplication_workflow = Workflow[State, WorkflowStep](name="MultiplicationWorkflow", schema=State)
    multiplication_workflow.add_step("pre_process", pre_process)
    multiplication_workflow.add_step("add_loop", add_loop)
    multiplication_workflow.add_step("post_process", post_process)

    response = await multiplication_workflow.run(State(x=8, y=5))
    print(f"result: {response.state.result}")

    response = await multiplication_workflow.run(State(x=8, y=-5))
    print(f"result: {response.state.result}")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```

{/* <!-- embedme typescript/examples/workflows/nesting.ts --> */}
```ts Typescript [expandable]
import { Workflow } from "beeai-framework/workflows/workflow";
import { z } from "zod";

const schema = z.object({
  threshold: z.number().min(0).max(1),
  counter: z.number().default(0),
});

const addFlow = new Workflow({ schema }).addStep("run", async (state) => {
  state.counter += 1;
  return Math.random() > 0.5 ? Workflow.SELF : Workflow.END;
});

const subtractFlow = new Workflow({
  schema,
}).addStep("run", async (state) => {
  state.counter -= 1;
  return Math.random() > 0.5 ? Workflow.SELF : Workflow.END;
});

const workflow = new Workflow({
  schema,
})
  .addStep("start", (state) =>
    Math.random() > state.threshold ? "delegateAdd" : "delegateSubtract",
  )
  .addStep("delegateAdd", addFlow.asStep({ next: Workflow.END }))
  .addStep("delegateSubtract", subtractFlow.asStep({ next: Workflow.END }));

const response = await workflow.run({ threshold: 0.5 }).observe((emitter) => {
  emitter.on("start", (data, event) =>
    console.log(`-> step ${data.step}`, event.trace?.parentRunId ? "(nested flow)" : ""),
  );
});
console.info(`Counter:`, response.result);

```

</CodeGroup>

This workflow demonstrates several powerful concepts:
- Implementing loops by returning `Workflow.SELF`
- Conditional transitions between steps
- Progressive state modification to accumulate results
- Sign handling through state transformation
- Type-safe step transitions using Literal types

---

### Multi-Agent Workflows

The multi-agent workflow pattern enables the orchestration of specialized agents that collaborate to solve complex problems. Each agent focuses on a specific domain or capability, with results combined by a coordinator agent.

<CodeGroup>

{/* <!-- embedme python/examples/workflows/multi_agents.py --> */}
```py Python [expandable]
import asyncio
import sys
import traceback

from beeai_framework.backend import ChatModel
from beeai_framework.emitter import EmitterOptions
from beeai_framework.errors import FrameworkError
from beeai_framework.tools.search.wikipedia import WikipediaTool
from beeai_framework.tools.weather import OpenMeteoTool
from beeai_framework.workflows.agent import AgentWorkflow, AgentWorkflowInput
from examples.helpers.io import ConsoleReader


async def main() -> None:
    llm = ChatModel.from_name("ollama:llama3.1")
    workflow = AgentWorkflow(name="Smart assistant")

    workflow.add_agent(
        name="Researcher",
        role="A diligent researcher.",
        instructions="You look up and provide information about a specific topic.",
        tools=[WikipediaTool()],
        llm=llm,
    )

    workflow.add_agent(
        name="WeatherForecaster",
        role="A weather reporter.",
        instructions="You provide detailed weather reports.",
        tools=[OpenMeteoTool()],
        llm=llm,
    )

    workflow.add_agent(
        name="DataSynthesizer",
        role="A meticulous and creative data synthesizer",
        instructions="You can combine disparate information into a final coherent summary.",
        llm=llm,
    )

    reader = ConsoleReader()

    reader.write("Assistant 🤖 : ", "What location do you want to learn about?")
    for prompt in reader:
        await (
            workflow.run(
                inputs=[
                    AgentWorkflowInput(prompt="Provide a short history of the location.", context=prompt),
                    AgentWorkflowInput(
                        prompt="Provide a comprehensive weather summary for the location today.",
                        expected_output="Essential weather details such as chance of rain, temperature and wind. Only report information that is available.",
                    ),
                    AgentWorkflowInput(
                        prompt="Summarize the historical and weather data for the location.",
                        expected_output="A paragraph that describes the history of the location, followed by the current weather conditions.",
                    ),
                ]
            )
            .on(
                # Event Matcher -> match agent's 'success' events
                lambda event: isinstance(event.creator, ChatModel) and event.name == "success",
                # log data to the console
                lambda data, event: reader.write(
                    "->Got response from the LLM",
                    "  \n->".join([str(message.content[0].model_dump()) for message in data.value.messages]),
                ),
                EmitterOptions(match_nested=True),
            )
            .on(
                "success",
                lambda data, event: reader.write(
                    f"->Step '{data.step}' has been completed with the following outcome."
                    f"\n\n{data.state.final_answer}\n\n",
                    data.model_dump(exclude={"data"}),
                ),
            )
        )
        reader.write("Assistant 🤖 : ", "What location do you want to learn about?")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```

{/* <!-- embedme typescript/examples/workflows/multiAgents.ts --> */}
```ts Typescript [expandable]
import "dotenv/config";
import { createConsoleReader } from "examples/helpers/io.js";
import { OpenMeteoTool } from "beeai-framework/tools/weather/openMeteo";
import { WikipediaTool } from "beeai-framework/tools/search/wikipedia";
import { AgentWorkflow } from "beeai-framework/workflows/agent";
import { OllamaChatModel } from "beeai-framework/adapters/ollama/backend/chat";

const workflow = new AgentWorkflow("Smart assistant");
const llm = new OllamaChatModel("llama3.1");

workflow.addAgent({
  name: "Researcher",
  role: "A diligent researcher",
  instructions: "You look up and provide information about a specific topic.",
  tools: [new WikipediaTool()],
  llm,
});
workflow.addAgent({
  name: "WeatherForecaster",
  role: "A weather reporter",
  instructions: "You provide detailed weather reports.",
  tools: [new OpenMeteoTool()],
  llm,
});
workflow.addAgent({
  name: "DataSynthesizer",
  role: "A meticulous and creative data synthesizer",
  instructions: "You can combine disparate information into a final coherent summary.",
  llm,
});

const reader = createConsoleReader();
reader.write("Assistant 🤖 : ", "What location do you want to learn about?");
for await (const { prompt } of reader) {
  const { result } = await workflow
    .run([
      { prompt: "Provide a short history of the location.", context: prompt },
      {
        prompt: "Provide a comprehensive weather summary for the location today.",
        expectedOutput:
          "Essential weather details such as chance of rain, temperature and wind. Only report information that is available.",
      },
      {
        prompt: "Summarize the historical and weather data for the location.",
        expectedOutput:
          "A paragraph that describes the history of the location, followed by the current weather conditions.",
      },
    ])
    .observe((emitter) => {
      emitter.on("success", (data) => {
        reader.write(
          `Step '${data.step}' has been completed with the following outcome:\n`,
          data.state?.finalAnswer ?? "-",
        );
      });
    });

  reader.write(`Assistant 🤖`, result.finalAnswer);
  reader.write("Assistant 🤖 : ", "What location do you want to learn about?");
}

```

</CodeGroup>

This pattern demonstrates:

- Role specialization through focused agent configuration
- Efficient tool distribution to relevant specialists
- Parallel processing of different aspects of a query
- Synthesis of multiple expert perspectives into a cohesive response

<Tip>
See the [events documentation](/modules/events) for more information on standard emitter events.
</Tip>

### Memory in Workflows

Integrating memory into workflows allows agents to maintain context across interactions, enabling conversational interfaces and stateful processing. This example demonstrates a simple conversational echo workflow with persistent memory.

<CodeGroup>

{/* <!-- embedme python/examples/workflows/memory.py --> */}
```py Python [expandable]
import asyncio
import sys
import traceback

from pydantic import BaseModel, InstanceOf

from beeai_framework.backend import AssistantMessage, UserMessage
from beeai_framework.errors import FrameworkError
from beeai_framework.memory import UnconstrainedMemory
from beeai_framework.workflows import Workflow
from examples.helpers.io import ConsoleReader


async def main() -> None:
    # State with memory
    class State(BaseModel):
        memory: InstanceOf[UnconstrainedMemory]
        output: str = ""

    async def echo(state: State) -> str:
        # Get the last message in memory
        last_message = state.memory.messages[-1]
        state.output = last_message.text[::-1]
        return Workflow.END

    reader = ConsoleReader()

    memory = UnconstrainedMemory()
    workflow = Workflow(State)
    workflow.add_step("echo", echo)

    for prompt in reader:
        # Add user message to memory
        await memory.add(UserMessage(content=prompt))
        # Run workflow with memory
        response = await workflow.run(State(memory=memory))
        # Add assistant response to memory
        await memory.add(AssistantMessage(content=response.state.output))

        reader.write("Assistant 🤖 : ", response.state.output)


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```

{/* <!-- todo typescript/examples/workflows/memory.ts --> */}
```ts Typescript [expandable]
Example coming soon
```

</CodeGroup>

This pattern demonstrates:
- Integration of memory as a first-class citizen in workflow state
- Conversation loops that preserve context across interactions
- Bidirectional memory updating (reading recent messages, storing responses)
- Clean separation between the persistent memory and workflow-specific state

---

## Examples

<CardGroup cols={2}>
  <Card title="Python" icon="python" href="https://github.com/i-am-bee/beeai-framework/tree/main/python/examples/workflows">
    Explore reference workflow implementations in Python
  </Card>
  <Card title="TypeScript" icon="js" href="https://github.com/i-am-bee/beeai-framework/tree/main/typescript/examples/workflows">
    Explore reference workflow implementations in TypeScript
  </Card>
</CardGroup>
